package v0

import (
	"fmt"
	"gitee.com/ymofen/golang/gobase"
	"gitee.com/ymofen/panicsafe"
	"sync"
	"sync/atomic"
	"time"
)

/**
  push任务的时候可能会阻塞, 如果 chan通道满了会阻塞
	  需要手动调用CheckRunWorker 进行增加Worker运行(每次增加一个), 最多到WorkMax
	  需要手动调用CheckReleaseWorker 进行Worker的释放到 最少Woker

*/

type TaskPool struct {
	TaskPush int64
	TaskPop  int64

	workN      int32
	WorkMin    int32 //
	WorkMax    int32 //
	BusyCnt    int32 // 正在目录的线程
	Terminated int32

	maxCache   int
	check_tick *time.Ticker
	closeW     sync.WaitGroup
	taskChan   chan []interface{}
}

func NewTaskPool(maxCache int) *TaskPool {
	if maxCache <= 0 {
		maxCache = 1024
	}
	rval := &TaskPool{
		maxCache:   maxCache,
		taskChan:   make(chan []interface{}, maxCache<<2),
		check_tick: time.NewTicker(time.Second),
		WorkMin:    8,
		WorkMax:    1024,
	}
	return rval
}

type TaskWorker struct {
	GoID  uint64
	Owner *TaskPool
	Fixed int8
}

func (this *TaskWorker) innerCall(params []interface{}) {
	atomic.AddInt32(&this.Owner.BusyCnt, 1)
	defer atomic.AddInt32(&this.Owner.BusyCnt, -1)
	fn := params[0].(RunableProc)
	args := params[1].([]interface{})
	fn(this, args...)
}

func (this *TaskWorker) innerRunFixed() {
break_for:
	for {
		select {
		case params := <-this.Owner.taskChan:
			atomic.AddInt64(&this.Owner.TaskPop, 1)
			if params == nil || len(params) == 0 {
				break break_for
			}
			if this.Owner.Terminated == 0 {
				this.innerCall(params)
			}
		}
	}
}

func (this *TaskWorker) innerRun() {
break_for:
	for {
		select {
		case params := <-this.Owner.taskChan:
			atomic.AddInt64(&this.Owner.TaskPop, 1)
			if params == nil || len(params) == 0 {
				break break_for
			}
			if this.Owner.Terminated == 0 {
				this.innerCall(params)
			}
		case <-this.Owner.check_tick.C:
			if this.Owner.checkRelease() {
				go this.Owner.releaseWorker() // 异步投递避免堵塞
			}
		}
	}
}

func (this *TaskWorker) Run(beginfn func()) {
	go func() {
		if panicsafe.GoFunCatchException {
			defer panicsafe.DeferCatchPanic()
		}

		if beginfn != nil {
			beginfn()
		}

		r := this.Owner.tryAddWorker()
		if r {
			defer this.Owner.decWorker()
			this.GoID = gobase.GetCurrentGoRoutineID()
			if this.Fixed == 1 {
				this.innerRunFixed()
			} else {
				this.innerRun()
			}

		}
	}()
}

type RunableProc func(worker *TaskWorker, args ...interface{})

func (this *TaskPool) GetTaskNum() int {
	return len(this.taskChan)
}

func (this *TaskPool) WorkNum() int {
	n := atomic.LoadInt32(&this.workN)
	return int(n)
}

func (this *TaskPool) checkRunWorker() {
	n := atomic.LoadInt32(&this.workN)
	if n < this.WorkMax {
		var wg sync.WaitGroup
		l := atomic.LoadInt32(&this.BusyCnt)
		if l == n { // 都正在忙碌
			wg.Add(1)
			this.innerRunWorker(0, func() {
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func (this *TaskPool) checkRelease() bool {
	n := atomic.LoadInt32(&this.workN)
	if n < this.WorkMin {
		return false
	}
	l := atomic.LoadInt32(&this.BusyCnt) // 正在忙碌
	if n-l > this.WorkMin {              // 释放
		return true //
	}
	return false

}
func (this *TaskPool) releaseWorker() {
	this.taskChan <- nil
	atomic.AddInt64(&this.TaskPush, 1)
}

func (this *TaskPool) IsTerminated() bool {
	return this.Terminated == 1
}

func (this *TaskPool) Status() string {
	return fmt.Sprintf("busy/n/min/max:%d/%d/%d/%d, chan:%d/%d, terminated:%d", this.BusyCnt, this.workN, this.WorkMin, this.WorkMax, len(this.taskChan), cap(this.taskChan), this.Terminated)
}

func (this *TaskPool) PostTask(fn RunableProc, args ...interface{}) bool {
	if this.Terminated == 1 {
		return false
	}

	this.checkRunWorker()

	params := make([]interface{}, 2)
	params[0] = fn
	params[1] = args

	// 加入超时机智
	select {
	case this.taskChan <- params:
	case <-time.After(time.Second):
		return false
	}

	atomic.AddInt64(&this.TaskPush, 1)
	return true
}

func (this *TaskPool) Close() {
	this.Terminated = 1
	n := this.workN
	for i := int32(0); i < n; i++ {
		this.taskChan <- nil
		atomic.AddInt64(&this.TaskPush, 1)
	}
	this.closeW.Wait()

}

func (this *TaskPool) tryAddWorker() bool {
	r := atomic.AddInt32(&this.workN, 1)
	if r <= this.WorkMax {
		this.closeW.Add(1)
		return true
	} else {
		atomic.AddInt32(&this.workN, -1)
		return false
	}
}

func (this *TaskPool) decWorker() {
	atomic.AddInt32(&this.workN, -1)
	this.closeW.Add(-1)
}

func (this *TaskPool) innerRunWorker(fixed int8, beginfn func()) {
	if this.Terminated == 1 {
		return
	}
	worker := &TaskWorker{Owner: this, Fixed: fixed}
	worker.Run(beginfn)
}

func (this *TaskPool) RunMinWorkers() {
	var wg sync.WaitGroup
	for i := this.workN; i <= this.WorkMin; i++ {
		wg.Add(1)
		this.innerRunWorker(1, func() {
			wg.Done()
		})
	}
	wg.Wait()
}
